package com.stream

import java.util.Properties

import com.stream.utils.TimeUtil
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
/**
  * JSONKeyValueDeserializationSchema
  * 可以将序列化的JSON转换为 ObjectNode 对象，
  * 可以用objectNode.get("field")访问字段。
  * 新建JSONKeyValueDeserializationSchema需要带一个boolean类型参数，
  * 为true表示需要指明是否需要包含“元数据”、偏移量、分区和主题等信息，
  * 为false表明只需要数据。
  */
object FlinkJSONKeyValueDeserializationSchema {
  def main(args: Array[String]): Unit = {
    println("===================================FlinkJSONKeyValueDeserializationSchema=====================开始")
      val env  = StreamExecutionEnvironment.getExecutionEnvironment
      //开启checkPoint, Time interval between state checkpoints 5000 milliseconds.
      /**
        * 如果我们启用了Flink的Checkpint机制，
        * 那么Flink Kafka Consumer将会从指定的Topic中消费消息，
        * 然后定期地将Kafka offsets信息、状态信息以及其他的操作信息进行Checkpint。
        * 所以，如果Flink作业出故障了，Flink将会从最新的Checkpint中恢复，
        * 并且从上一次偏移量开始读取Kafka中消费消息。
        */
       env.enableCheckpointing(5000)

      //设置系统基本时间特性为事件时间
      // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

       val props = new Properties()
        props.setProperty("zookeeper.connect", "192.168.86.101:2181,192.168.86.102:2181,192.168.86.103:2181")
        props.setProperty("bootstrap.servers","192.168.86.101:9092")
        props.setProperty("group.id","stream_flink_test")

    val consumer: FlinkKafkaConsumer010[ObjectNode] = new FlinkKafkaConsumer010[ObjectNode]("textTopic",new JSONKeyValueDeserializationSchema(true),props)

    val jsonDstream: DataStream[ObjectNode] = env.addSource(consumer)
//    jsonDstream.print()
    val result = jsonDstream.map(obj => {
        val create_time = obj.get("value").get("create_time").asText()
        val id          = obj.get("value").get("create_targ").asText()
        val c_time: String = TimeUtil.timeType2Type(create_time.toString,"yyyy-MM-dd HH:mm:ss SSS","yyyy-MM-dd HH:mm")
        (id , c_time,create_time)
    })
    .map(f => (  (f._1  , f._2)  , 1 ))
      .keyBy(0)
        .timeWindow(Time.seconds(20))
        .sum(1)

    result.print()
    env.execute("FlinkJSONKeyValueDeserializationSchema ")







  }
}

